import os
import shutil
import zipfile
import urllib.request
REPO_ZIP_FILE = 'LinearizedNNs-master.zip'
urllib.request.urlretrieve('https://github.com/maxkvant/LinearizedNNs/archive/master.zip', REPO_ZIP_FILE)
REPO_PATH = "LinearizedNNs-master"
if os.path.exists(REPO_PATH):
shutil.rmtree(REPO_PATH)
with zipfile.ZipFile(REPO_ZIP_FILE, 'r') as zip_ref:
zip_ref.extractall('.')
assert os.path.exists(REPO_PATH)
import sys
sys.path.append(f"{REPO_PATH}/src")
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torchvision import transforms, datasets
from torchvision.datasets import FashionMNIST
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import RidgeClassifier
from sklearn.decomposition import PCA
from xgboost import XGBClassifier
from pytorch_impl.nns import ResNet, FCN, CNN
from pytorch_impl.nns import warm_up_batch_norm
from pytorch_impl.estimators import LinearizedSgdEstimator, SgdEstimator, MatrixExpEstimator
from pytorch_impl import ClassifierTraining
from pytorch_impl.matrix_exp import matrix_exp, compute_exp_term
from pytorch_impl.nns.utils import to_one_hot
torch.manual_seed(0)
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')
print('Torch version: {}'.format(torch.__version__))
print('Device: {}'.format(device))
D = 28
num_classes = 10
train_loader = torch.utils.data.DataLoader(
FashionMNIST(root='.', train=True, download=True,
transform=transforms.ToTensor()),
batch_size=4096, shuffle=True, pin_memory=True)
test_loader = torch.utils.data.DataLoader(
FashionMNIST(root='.', train=False, transform=transforms.ToTensor()),
batch_size=4096, shuffle=True, pin_memory=True)
a = 30 / 180 * np.pi
M = torch.tensor([[0, -1], [1, 0]]) * a
matrix_exp(M, device)
a = 30 / 180 * np.pi
M = torch.tensor([[0, -1], [1, 0]]) * a
M_clone = M.clone().to(device)
torch.matmul(M_clone, compute_exp_term(M, device)) + torch.eye(2).to(device)
CNN(1, input_channels=1)
_, (X, y) = next(enumerate(train_loader))
X.size()
model = CNN(1, input_channels=1, num_channels=256).to(device)
warm_up_batch_norm(model, train_loader, device)
estimator = MatrixExpEstimator(model, num_classes, device, learning_rate=1e1, momentum=0.)
_, (X, y) = next(enumerate(train_loader))
X, y = X.to(device), y.to(device)
estimator.fit(X, y)
ClassifierTraining(estimator, device).get_accuracy(test_loader)
def get_estimator(model, num_classes):
return MatrixExpEstimator(model, num_classes, device, learning_rate=10.)
ws_sum = get_estimator(model, num_classes).ws.detach()
prev_size = None
batches = 0
for batch_id, (X, y) in enumerate(train_loader):
if (prev_size is not None) and prev_size != len(X):
break
prev_size = len(X)
X, y = X.to(device), y.to(device)
estimator = get_estimator(model, num_classes)
estimator.fit(X, y)
ws_sum += estimator.ws.detach()
batches += 1
estimator = get_estimator(model, num_classes)
estimator.ws = ws_sum / batches
ClassifierTraining(estimator, device).get_accuracy(test_loader)
model = CNN(10, input_channels=1).to(device)
warm_up_batch_norm(model, train_loader, device)
learning_rate = .005
estimator = SgdEstimator(model, nn.CrossEntropyLoss(), learning_rate)
training = ClassifierTraining(estimator, device)
training.train(train_loader, test_loader, num_epochs=200, learning_rate=learning_rate)
learning_rate = .02
linearized_estimator = LinearizedSgdEstimator(FCN(1, D * D).to(device), num_classes, nn.CrossEntropyLoss(), learning_rate)
linearized_training = ClassifierTraining(linearized_estimator, device)
linearized_training.train(train_loader, test_loader, num_epochs=10, learning_rate=learning_rate)
estimator = SgdEstimator(FCN(10, D * D).to(device), nn.CrossEntropyLoss(), learning_rate)
training = ClassifierTraining(estimator, device)
training.train(train_loader, test_loader, num_epochs=10, learning_rate=learning_rate)
_, (X, y) = next(enumerate(train_loader))
X, y = X.to(device), y.to(device)
estimator.fit(X, y)
print(estimator.predict(X).size())
((estimator.predict(X) - to_one_hot(y, num_classes).to(device)) ** 2).mean()
estimator
_, (X, y) = next(enumerate(test_loader))
X, y = X.to(device), y.to(device)
(torch.argmax(estimator.predict(X), dim=1) == y).double().mean()
torch.manual_seed(0)
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')
cifar10_stats = {
"mean" : (0.4914, 0.4822, 0.4465),
"std" : (0.24705882352941178, 0.24352941176470588, 0.2615686274509804),
}
transform_train = transforms.Compose([
transforms.Lambda(lambda x: np.asarray(x)),
transforms.Lambda(lambda x: np.pad(x, [(4, 4), (4, 4), (0, 0)], mode='reflect')),
transforms.Lambda(lambda x: Image.fromarray(x)),
transforms.RandomCrop(32),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(cifar10_stats['mean'], cifar10_stats['std']),
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(cifar10_stats['mean'], cifar10_stats['std']),
])
train_loader = torch.utils.data.DataLoader(
datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train), # change back to
batch_size=2048, shuffle=True, pin_memory=True)
test_loader = torch.utils.data.DataLoader(
datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test),
batch_size=2048, shuffle=True, pin_memory=True)
device
#### TODO